Skip to content

Rewrite the parquet output adapter manager#712

Open
arhamchopra wants to merge 11 commits into
mainfrom
ac/parquet_output_adapter
Open

Rewrite the parquet output adapter manager#712
arhamchopra wants to merge 11 commits into
mainfrom
ac/parquet_output_adapter

Conversation

@arhamchopra

@arhamchopra arhamchopra commented Jun 12, 2026

Copy link
Copy Markdown
Collaborator

Rewrite the parquet output adapter for RecordBatch-based writing

Replaces the old per-format C++ file-writer hierarchy (FileWriterWrapper / ParquetFileWriterWrapper / ArrowIPCFileWriterWrapper / FileWriterWrapperContainer) with a RecordBatch sink architecture: ParquetWriter builds Arrow RecordBatches and hands them to a small RecordBatchSink callback interface, and a single closure-based C++ sink (makeFileSink) writes parquet files, Arrow IPC streams, and split-column directories. This mirrors the input-side rewrite in #704 and shares the same Arrow type machinery.

Motivation

The old output path had:

  • A per-format writer-wrapper class hierarchy plus a FileWriterWrapperContainer to fan out to split-column files
  • A separate DialectGenericListWriterInterface for list columns, duplicating type dispatch already implemented in the shared Arrow layer
  • Per-type column-builder code (StructColumnArrayBuilder) that re-implemented nested-struct serialization the Arrow nodes already do
  • A registered-but-unreachable parquet_dict_basket_output_adapter (dead since basket output goes through the parquet_dict_basket_writer node)

The new implementation:

  • Builds one arrow::RecordBatch per batch_size rows and writes it through a 4-callback sink, so the file backend is a single, swappable component
  • Reuses the shared ArrowFieldWriter (via ArrowBackedArrayBuilder) for every column type — the same serialization code the struct_to_record_batches Arrow node and the input adapter use (scalars, structs, nested structs, lists)
  • Keeps all file I/O in C++ (no per-batch Python / Arrow C Data Interface hop on the write path)
  • Bakes file- and column-level metadata into the Arrow schema in C++, preserved identically in single-file and split-column modes

Architecture

ParquetOutputAdapter (per published column / struct / list)
  └→ ArrowBackedArrayBuilder      — wraps the shared ArrowFieldWriter (scratch or external mode)
ParquetWriter (EndCycleListener)
  ├→ accumulates rows, builds arrow::RecordBatch every batch_size rows
  ├→ bakes file_metadata / column_metadata into the schema in start()
  └→ onStart(schema) / onBatch(rb) / onFileChange(path) / onStop()
RecordBatchSink                   — struct of 4 std::function callbacks
  └→ makeFileSink(...)            — writes files entirely in C++:
        single   : one .parquet / .arrow file with all columns
        split    : a directory, one file per column
        rotation : onFileChange closes the current file and opens the next

RecordBatchSink (new) is a struct of four callbacks (onStart/onBatch/onFileChange/onStop) — the only contract between the writer and the file backend.

makeFileSink (new, RecordBatchFileSink.cpp) returns a closure-based sink (no class hierarchy) that owns overwrite checks, parent-directory creation, file rotation, compression, and the optional file_visitor. Compression is resolved through Arrow's own arrow::util::Codec API rather than a hardcoded name map.

ArrowBackedArrayBuilder (new) bridges csp's row-at-a-time "may-not-tick → null" model onto the batch-oriented shared ArrowFieldWriter, in two modes: scratch (single-field struct the tick writes into) and external (reads a field directly from a published struct).

ParquetOutputAdapterManager is simplified to orchestrate the writer + per-basket dict writers and wire the sink (and a per-basket sink factory) at construction.

What's removed

  • FileWriterWrapper / ParquetFileWriterWrapper / ArrowIPCFileWriterWrapper / FileWriterWrapperContainer — the old per-format C++ file-writer hierarchy
  • DialectGenericListWriterInterface — list columns now go through ArrowBackedArrayBuilder
  • StructColumnArrayBuilder — nested-struct columns are written by the shared ArrowFieldWriter::NestedStructWriter
  • parquet_dict_basket_output_adapter — registered but unreachable dead adapter
  • A large slice of ParquetOutputAdapter.cpp / ArrowSingleColumnArrayBuilder.h per-type boilerplate, folded into the shared Arrow writer

Bug fixes / hardening

Surfaced by a multi-model review of the new sink:

  • mkdir("") on a bare filename — writing to a relative path with no directory component (e.g. "out.parquet") no longer fails with Invalid argument; the empty dirname is guarded.
  • Close-path exception safety — the current writer is reset before the user file_visitor runs (no double-close if it throws), and the output stream / all split sub-writers are always closed even if one close fails (no leaked file descriptors).
  • Compression resolution via Arrow — codecs are resolved through arrow::util::Codec::GetCompressionType + IsAvailable (case-insensitive, tracks whatever the Arrow build supports, clear error otherwise) instead of a hardcoded lowercase map.
  • Explicit writeTimestampColumn honored — an explicitly requested timestamp column is no longer silently downgraded.
  • Single-file + dict basket fails fast — publishing a dict basket without split_columns_to_files=True now raises a clear error instead of a low-level IOError.
  • FileExistsError — writing over an existing file with allow_overwrite=False raises Python FileExistsError.
  • Removed a dead manager-level index sink and made adapter-manager stop() flush/close all writers before destroying any of them.

Performance

This is a maintainability/simplification change, not a performance optimization, and it is performance-neutral relative to main:

  • The dominant cost on the write path is the parquet/Arrow encode (WriteTable / IPC serialization). That code is unchanged here and produces byte-identical output to main, so end-to-end write time is at parity with main.
  • The struct → Arrow conversion the adapter itself performs is at rough parity, and is in fact marginally slower than main on a microbenchmark. Routing every column through the shared ArrowFieldWriter (rather than main's per-type column builders) trades a small amount of per-cell speed for a large reduction in duplicated serialization code: on a 2M-row × 10-column workload the isolated per-row append loop is ~10–15% slower than main. That loop is a small fraction of the encode cost and is not observable end-to-end, where the parquet encode (and, in typical graphs, the upstream engine/feed) dominates the run.

A direct typed-scalar-builder variant that closes the conversion gap was prototyped and measured: it recovers the append-loop difference but moves the end-to-end number by <0.5%, so it was not worth reintroducing per-type builder code into the simplified design.

A whole-file accuracy harness (schema, row groups, compression codec, metadata, and values) confirms the new sink produces logically identical output to main across 19 scenarios.

API compatibility

The public Python API (ParquetWriter, ParquetOutputConfig, publish, publish_struct, publish_dict_basket, filename_provider, file_visitor, file_metadata / column_metadata) is unchanged. csp/tests/adapters/test_parquet_output.py (65 tests, covering all scalar/struct/list/numpy types, batch-size→row-group counts, compression codecs, Arrow IPC, split-column, rotation, dict baskets, metadata, overwrite/FileExistsError) and the existing test_parquet.py suite pass.

ParquetWriter builds RecordBatches and hands them to a pluggable
RecordBatchSink (onStart/onBatch/onFileChange/onStop). Removes the old C++
file-writer hierarchy and unifies output conversion via visitCspValueType.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Remove dead StructColumnArrayBuilder and parquet_dict_basket_output_adapter,
de-virtualize scheduleEndCycleEvent, propagate file metadata to per-column
files in split mode, and expand output tests.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Closure-based C++ sink writes parquet/IPC/split-column files directly,
removing the per-batch C++<->Python hop. Adds FileExistsError.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
From the multi-model review:
- delete the now-dead Python sink (_parquet_rb_writer.py, rb_sink wiring,
  TestOutputSinkDirect) since file I/O is all C++ now
- RecordBatchFileSink: guard mkdir(""), fix close-path exception safety,
  resolve compression via Arrow's Codec API (case-insensitive)
- honor an explicit writeTimestampColumn
- fail fast on single-file + dict basket
- add regression tests

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Remove the unused manager-level m_indexSink/setIndexSink (per-basket index
sinks are unaffected) and stop all writers before destroying any in stop().

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Encapsulate the scratch isSet reset behind StructField::clearIsSet, add a
debug-only length assert in buildRecordBatch, and document that file_visitor
runs synchronously on the engine thread.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra marked this pull request as ready for review June 13, 2026 02:27
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>

@ptomecek ptomecek left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the whole output path (ParquetWriter -> RecordBatchSink -> makeFileSink), the dict-basket teardown, the Python wiring, and compared everything against the old FileWriterWrapper hierarchy. This is a clean rewrite. Each of the hardening fixes is real and correctly done: the mkdir guard for bare filenames, the close-path exception safety (no double-close, all sub-writers and the stream get closed even if one throws), compression resolved through Arrow's codec API, the explicit timestamp column being honored, the single-file + dict-basket fast failure, FileExistsError on overwrite, and the stop()-stops-everything-before-destroying-anything teardown. Arrow Status/Result values are checked at every call site. I didn't find any correctness, resource, or concurrency bugs.

Two low-severity things inline, plus one parity note I couldn't attach to a line:

Nested-struct field ordering: ArrowFieldWriter::NestedStructWriter (in ArrowFieldWriter.cpp, which isn't part of this PR) orders the arrow struct's child fields by declaration order, whereas the old code used the struct's memory-layout order. It reads back fine through the new name-based reader, but the on-disk child order differs from files written by older csp, which matters for anything reading those columns positionally. A test that pins down the struct-within-struct schema shape would be worth adding.

Overall this looks good to merge. The one thing I'd sort out first is the silently-dropped column below, since it turns an error into missing data.

Comment thread cpp/csp/adapters/parquet/ParquetOutputAdapter.cpp
Comment thread cpp/csp/adapters/parquet/ParquetWriter.cpp Outdated
Comment thread csp/tests/adapters/test_parquet_output.py
Comment thread csp/tests/adapters/test_parquet_output.py
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
@arhamchopra arhamchopra requested a review from ptomecek June 17, 2026 20:33
@ptomecek

Copy link
Copy Markdown
Collaborator

Looks good to me now

ptomecek
ptomecek previously approved these changes Jun 18, 2026

@AdamGlustein AdamGlustein left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a mere human who attempted reading through this code, I find it very difficult to follow with the lambda-based approach. First, figuring out what function is actually being called is not easy, and deciphering what actually makes it into the closure of that function is even harder. It would be much simpler (in my opinion) to make these callbacks member functions and replace the lambda capture variables with member variables for these classes.

Also, I think some of the complexity here was necessitated by being backwards compatible with the original Parquet adapter. In my opinion, I think we can relax that constraint. There are a lot of overly specific things (like symbol columns, etc.) that we don't need to support and can just drop completely from the code. With a minor version bump we're not going to break any contracts, and I highly doubt some of these vestigial features are used by anyone anymore.

catch( ... ) { if( !firstError ) firstError = std::current_exception(); }
};

auto && indexBuilder = m_cycleIndexOutputAdapter -> getColumnArrayBuilder( 0 );

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is the same as cpp/csp/adapters/parquet/ParquetDictBasketOutputWriter.cpp:105, make it a private helper method and call it flushIndexBatch() or something

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — extracted the duplicated index-flush block into a private flushIndexBatch() helper. It's now called from onEndCycle() (chunk full), onFileNameChange() (rotation), and stop() (final flush), so the build-RecordBatch-and-hand-to-sink logic lives in one place.

auto field = arrowBuilder -> scratchField();
m_columnArrayBuilder = arrowBuilder;

csp::adapters::arrow::visitCspValueType( type -> type(),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need m_type at all in this class anymore, we just use it to check if the type is supported at construction time and we have type available in the constructor anyways (just makes it more confusing that we check type -> type() for validity but throw the error with m_type -> type(); from what I see these are always the same)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — removed m_type from ParquetOutputHandler entirely. The base ctor no longer takes or stores the type, and the unsupported-type error now reports the local type param, so the validity check and the error message reference the same value.


RecordBatchSink sink;
sink.onStart = [schemaHolder]( const std::shared_ptr<::arrow::Schema> & schema ) { *schemaHolder = schema; };
sink.onBatch = [current]( const std::shared_ptr<::arrow::RecordBatch> & rb )

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the design here seems overly complicated to me, for example why do all of these callbacks (onStart, onBatch, onFileChange) need to be stored as lambda function members on the object? Can't they just be plain old member functions on the RecordBatchSink? I find it's quite hard to follow this design pattern

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — reworked this away from the closure-based design. The four callbacks are now plain member functions (onStart/onBatch/onFileChange/onStop) and the previously-captured state is member variables (m_schema, m_current, m_currentPath, plus the config). No more lambda captures to trace through.

I also collapsed the abstract RecordBatchSink interface and the concrete RecordBatchFileSink into a single concrete RecordBatchSink class — there's only one implementation, so the interface/unique_ptr/virtual dispatch wasn't earning its keep. The file is now RecordBatchSink.cpp.

initFileWriterContainer( arrow::schema( arrowFields, m_fileMetaData ) );

m_schema = ::arrow::schema( arrowFields, m_fileMetaData );
if( m_sink.onStart )

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is quite confusing to me.
All onStart does (from what I can see) is transfer the schema to the sink object.
Yet m_fileOpen is true after. I can't even really see where we open the file in onFileChange either.
Overall let's get rid of all these lambda function callbacks if we can, trying to figure out a) which function is actually being used and b) the closure, is a nightmare when looking at the code.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — start() should read clearly now: it hands the schema to the sink via onStart(), then does m_fileOpen = m_sink->onFileChange(fileName) to open the first file (if any). The actual file open happens inside RecordBatchSink::onFileChange (via openWriter()), and m_fileOpen is set from that method's bool return rather than being inferred. With the callbacks now being ordinary member functions, it's no longer a question of which lambda runs.

flushBatch();
if( m_sink.onFileChange )
m_sink.onFileChange( fileName );
m_fileOpen = !fileName.empty();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a weird way of checking if the file change was successful (emptying the filename in the function that was called?). Why not just return a bool from m_sink.onFileChange if it was successful?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — onFileChange now returns bool (true if a file is open after the call, i.e. the path was non-empty). ParquetWriter sets m_fileOpen directly from that return value, so the "empty the filename to signal success" trick is gone.

Comment thread csp/adapters/output_adapters/parquet.py Outdated

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't see us using this, we just hardcode 2.6 at cpp/csp/adapters/parquet/RecordBatchFileSink.cpp:83

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — removed the unused PARQUET_VERSION class attribute and the _get_default_parquet_version() helper, along with the now-dead importlib.metadata / packaging imports. As you noted, the C++ writer hardcodes PARQUET_2_6, so this value never flowed anywhere.

{
switch( m_type -> type() )
bool isBytes = false;
CspTypePtr effectiveType = type;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this effectiveType variable?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason — it was just an unmodified copy of type. Removed it and pass type straight to createArrowBackedArrayBuilder.

Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants